Skip to content

Conversation

@Chenyaaang
Copy link
Collaborator

Description

The implementation for Pipeline Parallelism are splitted into the following small PRs.

  • Util functions for PP on Jax (Create Utils functions for PP #1042)
  • worker changes (The current PR)
  • runner changes
  • platform changes
  • torchax changes
  • Jax changes
  • Single host changes (this will be in vLLM)
  • Multi host changes (Ray)

This PR is to modify Jax worker to support PP.

  • The worker __init__ takes in the current worker's IP and its previous worker's IP, to start transfer server and connection later.
  • During execute_model, for the PP workers who not in the first rank, they need to receive intermediate tensor from the previous worker. For the PP workers who is not the last rank, they need to send intermediate tensor to their next worker.
  • The profiler should profile every PP worker, so add subfolders under the parent profile_dir to save profiles for each worker.

Tests

E2E test has verified the whole PP implementation works properly.

Checklist

Before submitting this PR, please make sure:

  • I have performed a self-review of my code.
  • I have necessary comments in my code, particularly in hard-to-understand areas.
  • I have made or will make corresponding changes to any relevant documentation.

@github-actions
Copy link

github-actions bot commented Nov 7, 2025

Description

Start with a short description of what the PR does and how this is a change from
the past.

The rest of the description includes relevant details and context, examples:

  • why is this change being made,
  • the problem being solved and any relevant context,
  • why this is a good solution,
  • some information about the specific implementation,
  • shortcomings of the solution and possible future improvements.

If the change fixes a bug or a Github issue, please include a link, e.g.,:
FIXES: b/123456
FIXES: #123456

Tests

Please describe how you tested this change, and include any instructions and/or
commands to reproduce.

Checklist

Before submitting this PR, please make sure:

  • I have performed a self-review of my code.
  • I have necessary comments in my code, particularly in hard-to-understand areas.
  • I have made or will make corresponding changes to any relevant documentation.

Signed-off-by: Chenyaaang <[email protected]>
self.step_counter += 1
return None
else:
self.step_counter += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this step_counter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to generate uuid, we want it to be unique between each run and each worker, so we hash scheduler_output, step and worker_rank.

Copy link
Collaborator

@yixinshi yixinshi Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better explain this in line 125

multihost_backend = os.environ.get("TPU_MULTIHOST_BACKEND", "").lower()
if multihost_backend != "ray" and self.parallel_config.pipeline_parallel_size > 1:
# Note: Below is the setting for v6e8 host (8 chips of v6e)
# There are 2 ways of subslicing a v6e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need report errors if the settings are not in these 2 ways?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use v6e8 as an example to provide 2 ways to subslice the chips. I was thinking if the customer is using other chips, they should replace with their own topology. Do you have any better idea to interpret this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the topology be passed as config variables (and make the default as one of v6e's supported topology) or at least a parameter of init_device() so people would find the needed changes more easily? And please move lines 136-141 to line 152.

Signed-off-by: Chenyaaang <[email protected]>
@Chenyaaang
Copy link
Collaborator Author

@sixiang-google Hi Xiang, can you help me take a look at init_device() will my change to local_devices affect disagg serving? Thanks

# For PP, we use MPMD so we want to profile every worker.
if self.pp_world_size > 1 and envs.VLLM_TORCH_PROFILER_DIR:
self.profile_dir = os.path.join(envs.VLLM_TORCH_PROFILER_DIR,
f"rank_{self.rank}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure the convention here but this might be more informative? f"rank_{self.rank}_{self.pp_world_size}"


assert jax.local_device_count(
) >= sharding_config.total_devices
self.devices = jax.local_devices()[:sharding_config.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: combine line 183 and 184 to improve the readability?

self.rank, self.rank == 0,
self.rank == self.pp_world_size - 1)
logger.info(f"Init worker | "
f"rank={self.rank} | "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add world_size as well?

Copy link
Collaborator

@yixinshi yixinshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work! A general comment: shall we have more specific PR title here for each PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants